-
Notifications
You must be signed in to change notification settings - Fork 298
feat: allow list items to be processed in parallel #738
base: master
Are you sure you want to change the base?
feat: allow list items to be processed in parallel #738
Conversation
849c153
to
0f1cd01
Compare
Signed-off-by: Shady Rafehi <[email protected]>
0f1cd01
to
4e4d254
Compare
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #738 +/- ##
==========================================
- Coverage 54.26% 47.67% -6.59%
==========================================
Files 64 64
Lines 6164 6611 +447
==========================================
- Hits 3345 3152 -193
- Misses 2549 3199 +650
+ Partials 270 260 -10 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
listPageSize: defaultListPageSize, | ||
listPageBufferSize: defaultListPageBufferSize, | ||
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight), | ||
listItemSemaphoreWeight: defaultListItemSemaphoreWeight, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great that it maintains existing behavior by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very elegant solution!
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { | ||
return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { | ||
if un, ok := obj.(*unstructured.Unstructured); !ok { | ||
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName()) | ||
} else { | ||
items = append(items, c.newResource(un)) | ||
if err := limiter.Run(ctx, func() { | ||
newRes := c.newResource(un) | ||
listLock.Lock() | ||
items = append(items, newRes) | ||
listLock.Unlock() | ||
}); err != nil { | ||
return fmt.Errorf("failed to process list item: %w", err) | ||
} | ||
} | ||
return nil | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to call this out as a trade-off, I think this will cause a change in ordering when using concurrency. But this is only when its enabled. Not a bad trade-off, just something to be aware of.
This PR introduces a new setting which allows items returned by
ListPager
to be processed in parallel.Currently, items are processed sequentially per resource. This is a problem if
populateResourceInfoHandler
is expensive. We internally have a custom resource which takes 1.5 milliseconds in thepopulateResourceInfoHandler
function due to a custom lua healthcheck script and a few ignored labels which are removed during diff normalisation. With over 50,000 of these custom resources, that's a 75 second overhead.With a parallelism of 8, we've seen a page of 500 items go from ~750ms to ~100ms. We've also seen the processing of a full list complete at roughly the same time as
ListPager
completes. Previously, processing of pages constantly lagged behind the retrieval.gobench
results:Before and after: